其他
Flink系列 - 实时数仓之Flink实时写入ClickHouse并实时大屏(四)
分享嘉宾:lbship
编辑整理:仙子紫霞
出品平台:数据仓库与Python大数据
正文
package TopNitems
import java.text.SimpleDateFormat
import java.time.{LocalTime, ZonedDateTime}
import java.time.format.DateTimeFormatter
import java.util.{Date, Locale, Properties}
import scala.io.Source
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import Array._
import scala.util.Random.shuffle
object KafkaProducers {
def main(args: Array[String]): Unit = {
SendtoKafka("test")
}
def SendtoKafka(topic:String): Unit = {
val pro=new Properties()
pro.put("bootstrap.servers", "192.168.226.10:9092")
pro.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
pro.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer=new KafkaProducer[String,String](pro)
var member_id= range(1,10)
var goods=Array("Milk","Bread","Rice","Nodles","Cookies","Fish","Meat","Fruit","Drink","Books","Clothes","Toys")
//var ts=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss",Locale.CHINA).format( ZonedDateTime.now())
while (true) {
var ts=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
var msg = shuffle(member_id.toList).head + "\t" + shuffle(goods.toList).head + "\t" + ts+"\t"+"\n"
print(msg)
var record = new ProducerRecord[String, String](topic, msg)
producer.send(record)
Thread.sleep(2000)
}
//val source=Source.fromFile("C:\\UserBehavior.csv")
//for (line<-source.getLines()){
// val record=new ProducerRecord[String,String](topic,line)
//print(ts)
producer.close()
}
}
API名称 | flink-jdbc | flink-connector-jdbc |
---|---|---|
DataStream | 不支持 | 支持 |
Table API (Legecy) | 支持 | 不支持 |
Table API (DDL) | 不支持 | 不支持 |
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<!-- 添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
package TopNitems
import java.sql.PreparedStatement
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.descriptors.Kafka
//当前版本的 flink-connector-jdbc,使用 Scala API 调用 JdbcSink 时会出现 lambda 函数的序列化问题。我们只能采用手动实现 interface 的方式来传入相关 JDBC Statement build 函数
class CkSinkBuilder extends JdbcStatementBuilder[(Int, String, String)] {
def accept(ps: PreparedStatement, v: (Int, String, String)): Unit = {
ps.setInt(1, v._1)
ps.setString(2, v._2)
ps.setString(3, v._3)
}
}
object To_CK {
def main(args: Array[String]): Unit = {
//获得环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) //设置并发为1,防止打印控制台乱序
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //Flink 默认使用 ProcessingTime 处理,设置成event time
val tEnv = StreamTableEnvironment.create(env) //Table Env 环境
//从Kafka读取数据
val pros = new Properties()
pros.setProperty("bootstrap.servers", "192.168.226.10:9092")
pros.setProperty("group.id", "test")
pros.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
pros.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
pros.setProperty("auto.offset.reset", "latest")
import org.apache.flink.api.scala._
val dataSource = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), pros))
val sql="insert into ChinaDW.testken(userid,items,create_date)values(?,?,?)"
val result = dataSource.map(line => {
val x = line.split("\t")
//print("收到数据",x(0),x(1),x(2),"\n")
val member_id = x(0).trim.toLong
val item = x(1).trim
val times = x(2).trim
var time = 0l
try{time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(times).getTime} //时间戳类型
catch {case e: Exception => {print( e.getMessage)}}
(member_id.toInt, item.toString ,time.toLong)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Int, String, Long)](Time.seconds(2)) {
override def extractTimestamp(t: (Int, String, Long)): Long = t._3
}).map(x=>{(x._1,x._2,new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(x._3))}) //时间还原成datetime类型
//result.print()
result.addSink(JdbcSink.sink[(Int,String,String)](sql,new CkSinkBuilder,new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://XX.XX.XX.XX:8123")
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername("default")
.build()
))
env.execute("To_CK")
}
}
今天的分享就到这里,谢谢大家。
有用的话,文末分享、点赞、在看~
作者:lbship
链接:文末阅读原文
著作权归作者所有,本公众号取得独家授权。欢迎广大技术人员投稿,加v:iom1128,备注:投稿
文章推荐:
社群推荐:
关于我们:
入群请联系小助手:iom1128『紫霞仙子』
!关注不迷路~ 各种福利、资源定期分享!